/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package kafka.server

import java.util

import AbstractFetcherThread.ResultWithPartitions
import kafka.api._
import kafka.cluster.BrokerEndPoint
import kafka.log.LogConfig
import kafka.server.ReplicaFetcherThread._
import kafka.server.epoch.LeaderEpochCache
import kafka.zk.AdminZkClient
import org.apache.kafka.clients.FetchSessionHandler
import org.apache.kafka.common.requests.EpochEndOffset._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.{MemoryRecords, Records}
import org.apache.kafka.common.requests.{EpochEndOffset, FetchResponse, ListOffsetRequest, ListOffsetResponse, OffsetsForLeaderEpochRequest, OffsetsForLeaderEpochResponse, FetchRequest => JFetchRequest}
import org.apache.kafka.common.utils.{LogContext, Time}

import scala.collection.JavaConverters._
import scala.collection.{Map, mutable}

class ReplicaFetcherThread(name: String,
                           fetcherId: Int,
                           sourceBroker: BrokerEndPoint,
                           brokerConfig: KafkaConfig,
                           replicaMgr: ReplicaManager,
                           metrics: Metrics,
                           time: Time,
                           quota: ReplicaQuota,
                           leaderEndpointBlockingSend: Option[BlockingSend] = None)
  extends AbstractFetcherThread(name = name,
                                clientId = name,
                                sourceBroker = sourceBroker,
                                fetchBackOffMs = brokerConfig.replicaFetchBackoffMs,
                                isInterruptible = false,
                                includeLogTruncation = true) {

  type REQ = FetchRequest
  type PD = PartitionData

  private val replicaId = brokerConfig.brokerId
  private val logContext = new LogContext(s"[ReplicaFetcher replicaId=$replicaId, leaderId=${sourceBroker.id}, " +
    s"fetcherId=$fetcherId] ")
  this.logIdent = logContext.logPrefix

  private val leaderEndpoint = leaderEndpointBlockingSend.getOrElse(
    new ReplicaFetcherBlockingSend(sourceBroker, brokerConfig, metrics, time, fetcherId,
      s"broker-$replicaId-fetcher-$fetcherId", logContext))

  // Visible for testing
  private[server] val fetchRequestVersion: Short =
    if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_0_IV1) 8
    else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_1_1_IV0) 7
    else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV1) 5
    else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV0) 4
    else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_1_IV1) 3
    else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_0_IV0) 2
    else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_9_0) 1
    else 0

  // Visible for testing
  private[server] val offsetForLeaderEpochRequestVersion: Short =
    if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_0_IV0) 1
    else 0

  // Visible for testing
  private[server] val listOffsetRequestVersion: Short =
    if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_0_IV1) 3
    else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV0) 2
    else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_1_IV2) 1
    else 0

  private val fetchMetadataSupported = brokerConfig.interBrokerProtocolVersion >= KAFKA_1_1_IV0
  private val maxWait = brokerConfig.replicaFetchWaitMaxMs
  private val minBytes = brokerConfig.replicaFetchMinBytes
  private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes
  private val fetchSize = brokerConfig.replicaFetchMaxBytes
  private val shouldSendLeaderEpochRequest: Boolean = brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV2
  private val fetchSessionHandler = new FetchSessionHandler(logContext, sourceBroker.id)

  private def epochCacheOpt(tp: TopicPartition): Option[LeaderEpochCache] =  replicaMgr.getReplica(tp).map(_.epochs.get)

  override def initiateShutdown(): Boolean = {
    val justShutdown = super.initiateShutdown()
    if (justShutdown) {
      leaderEndpoint.close()
    }
    justShutdown
  }

  // process fetched data
  def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: PartitionData) {
    val replica = replicaMgr.getReplicaOrException(topicPartition)
    val partition = replicaMgr.getPartition(topicPartition).get
    val records = partitionData.toRecords

    maybeWarnIfOversizedRecords(records, topicPartition)
    // 确保开始fetchOffset与现有分区log的endOffset一致才写入数据
    if (fetchOffset != replica.logEndOffset.messageOffset)
      throw new IllegalStateException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format(
        topicPartition, fetchOffset, replica.logEndOffset.messageOffset))

    // 作为follower将leader的同步结果append到本地副本log中，
    partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = false)

    // 在本地副本的logEndOffset.messageOffset和返回结果partitionData.highWatermark中取较小值作为followerHighWatermark，来更新副本内存缓存的highWatermark
    val followerHighWatermark = replica.logEndOffset.messageOffset.min(partitionData.highWatermark)
    val leaderLogStartOffset = partitionData.logStartOffset

    // 每次在fetch数据的时候，leader会把自己的HW返回给你
    // follower的LEO和leader的HW 中的最小值，会作为自己的HW
    // leader和follower的HW是不可能完全一致的，

    replica.highWatermark = new LogOffsetMetadata(followerHighWatermark)
    // 如果leaderlogStartOffset < 本地highWatermark.messageOffset,且 newLogStartOffset > logStartOffset，
    // 则更新本地的log start offset
    replica.maybeIncrementLogStartOffset(leaderLogStartOffset)

    if (quota.isThrottled(topicPartition))
      quota.record(records.sizeInBytes)
    replicaMgr.brokerTopicStats.updateReplicationBytesIn(records.sizeInBytes)
  }

  def maybeWarnIfOversizedRecords(records: MemoryRecords, topicPartition: TopicPartition): Unit = {
    if (fetchRequestVersion <= 2 && records.sizeInBytes > 0 && records.validBytes <= 0)
      error(s"Replication is failing due to a message that is greater than replica.fetch.max.bytes for partition $topicPartition. " +
        "This generally occurs when the max.message.bytes has been overridden to exceed this value and a suitably large " +
        "message has also been sent. To fix this problem increase replica.fetch.max.bytes in your broker config to be " +
        "equal or larger than your settings for max.message.bytes, both at a broker and topic level.")
  }

  /**
   * Handle a partition whose offset is out of range and return a new fetch offset.
   */
  def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = {
    val replica = replicaMgr.getReplicaOrException(topicPartition)
    val partition = replicaMgr.getPartition(topicPartition).get

    val leaderEndOffset: Long = earliestOrLatestOffset(topicPartition, ListOffsetRequest.LATEST_TIMESTAMP)

    if (leaderEndOffset < replica.logEndOffset.messageOffset) {
      // 如果leaderEndOffset<本地副本的offset,则截断本地副本日志。则截断本地副本日志
      val adminZkClient = new AdminZkClient(replicaMgr.zkClient)
      if (!LogConfig.fromProps(brokerConfig.originals, adminZkClient.fetchEntityConfig(
        ConfigType.Topic, topicPartition.topic)).uncleanLeaderElectionEnable) {
        throw new FatalExitError
      }

      partition.truncateTo(leaderEndOffset, isFuture = false)
      replicaMgr.replicaAlterLogDirsManager.markPartitionsForTruncation(brokerConfig.brokerId, topicPartition, leaderEndOffset)
      leaderEndOffset
    } else {
      // 如果leaderEndOffset >= replica.logEndOffset.messageOffset
      // 1. 可能是是follower宕机很久了，可能他的endOffset都小于leader的beginOffset
      // 2. 脏选举的发生

      // Kafka处理的方案是发送ListOffsetRequest请求获取leader的beginOffset,
      val leaderStartOffset: Long = earliestOrLatestOffset(topicPartition, ListOffsetRequest.EARLIEST_TIMESTAMP)
      // 在leaderStartOffset和本地副本logEndOffset取较大值作为fetch的起始
      val offsetToFetch = Math.max(leaderStartOffset, replica.logEndOffset.messageOffset)
      if (leaderStartOffset > replica.logEndOffset.messageOffset) {
        // 如果这种情况下将本地log都截断掉，设置leaderStartOffset为本地log的起始offset，等待同步线程拉取数据。
        partition.truncateFullyAndStartAt(leaderStartOffset, isFuture = false)
      }
      offsetToFetch
    }
  }

  // any logic for partitions whose leader has changed
  def handlePartitionsWithErrors(partitions: Iterable[TopicPartition]) {
    if (partitions.nonEmpty)
      delayPartitions(partitions, brokerConfig.replicaFetchBackoffMs.toLong)
  }

  protected def fetch(fetchRequest: FetchRequest): Seq[(TopicPartition, PartitionData)] = {
    try {
      val clientResponse = leaderEndpoint.sendRequest(fetchRequest.underlying)
      val fetchResponse = clientResponse.responseBody.asInstanceOf[FetchResponse[Records]]
      if (!fetchSessionHandler.handleResponse(fetchResponse)) {
        Nil
      } else {
        fetchResponse.responseData.asScala.toSeq.map { case (key, value) =>
          key -> new PartitionData(value)
        }
      }
    } catch {
      case t: Throwable =>
        fetchSessionHandler.handleError(t)
        throw t
    }
  }

  private def earliestOrLatestOffset(topicPartition: TopicPartition, earliestOrLatest: Long): Long = {
    val requestBuilder = if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_1_IV2) {
        val partitions = Map(topicPartition -> (earliestOrLatest: java.lang.Long))
        ListOffsetRequest.Builder.forReplica(listOffsetRequestVersion, replicaId).setTargetTimes(partitions.asJava)
      } else {
        val partitions = Map(topicPartition -> new ListOffsetRequest.PartitionData(earliestOrLatest, 1))
        ListOffsetRequest.Builder.forReplica(listOffsetRequestVersion, replicaId).setOffsetData(partitions.asJava)
      }
    val clientResponse = leaderEndpoint.sendRequest(requestBuilder)
    val response = clientResponse.responseBody.asInstanceOf[ListOffsetResponse]
    val partitionData = response.responseData.get(topicPartition)
    partitionData.error match {
      case Errors.NONE =>
        if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_1_IV2)
          partitionData.offset
        else
          partitionData.offsets.get(0)
      case error => throw error.exception
    }
  }

  override def buildFetchRequest(partitionMap: Seq[(TopicPartition, PartitionFetchState)]): ResultWithPartitions[FetchRequest] = {
    val partitionsWithError = mutable.Set[TopicPartition]()

    val builder = fetchSessionHandler.newBuilder()
    partitionMap.foreach { case (topicPartition, partitionFetchState) =>
      // We will not include a replica in the fetch request if it should be throttled.
      if (partitionFetchState.isReadyForFetch && !shouldFollowerThrottle(quota, topicPartition)) {
        try {
          val logStartOffset = replicaMgr.getReplicaOrException(topicPartition).logStartOffset
          builder.add(topicPartition, new JFetchRequest.PartitionData( partitionFetchState.fetchOffset, logStartOffset, fetchSize))
          // 你在拉取的时候，每个分区从哪个offset开始拉取
          // fetchSize, 拉取的数据量有多大，默认1M
        } catch {
          case _: KafkaStorageException =>
            // The replica has already been marked offline due to log directory failure and the original failure should have already been logged.
            // This partition should be removed from ReplicaFetcherThread soon by ReplicaManager.handleLogDirFailure()
            partitionsWithError += topicPartition
        }
      }
    }

    val fetchData = builder.build()
    // 一次fetch的请求过去，至少要拉取1个字节的数据，如果连1个字节的数据都没有，此时需要等待一段时间，最多等待500ms, 如果500ms只有还是没有新的数据到达leader, 此时返回就可以了
    val requestBuilder = JFetchRequest.Builder.forReplica(fetchRequestVersion, replicaId, maxWait, minBytes, fetchData.toSend()).setMaxBytes(maxBytes).toForget(fetchData.toForget)
    if (fetchMetadataSupported) {
      requestBuilder.metadata(fetchData.metadata())
    }

    ResultWithPartitions(new FetchRequest(fetchData.sessionPartitions(), requestBuilder), partitionsWithError)
  }

  /**
   * Truncate the log for each partition's epoch based on leader's returned epoch and offset.
   * The logic for finding the truncation offset is implemented in AbstractFetcherThread.getOffsetTruncationState
   */
  override def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): ResultWithPartitions[Map[TopicPartition, OffsetTruncationState]] = {
    val fetchOffsets = scala.collection.mutable.HashMap.empty[TopicPartition, OffsetTruncationState]
    val partitionsWithError = mutable.Set[TopicPartition]()

    fetchedEpochs.foreach { case (tp, leaderEpochOffset) =>
      try {
        val replica = replicaMgr.getReplicaOrException(tp)
        val partition = replicaMgr.getPartition(tp).get

        if (leaderEpochOffset.hasError) {
          info(s"Retrying leaderEpoch request for partition ${replica.topicPartition} as the leader reported an error: ${leaderEpochOffset.error}")
          partitionsWithError += tp
        } else {
          val offsetTruncationState = getOffsetTruncationState(tp, leaderEpochOffset, replica)
          if (offsetTruncationState.offset < replica.highWatermark.messageOffset)
            warn(s"Truncating $tp to offset ${offsetTruncationState.offset} below high watermark ${replica.highWatermark.messageOffset}")

          partition.truncateTo(offsetTruncationState.offset, isFuture = false)
          // mark the future replica for truncation only when we do last truncation
          if (offsetTruncationState.truncationCompleted)
            replicaMgr.replicaAlterLogDirsManager.markPartitionsForTruncation(brokerConfig.brokerId, tp, offsetTruncationState.offset)
          fetchOffsets.put(tp, offsetTruncationState)
        }
      } catch {
        case e: KafkaStorageException =>
          info(s"Failed to truncate $tp", e)
          partitionsWithError += tp
      }
    }

    ResultWithPartitions(fetchOffsets, partitionsWithError)
  }

  override def buildLeaderEpochRequest(allPartitions: Seq[(TopicPartition, PartitionFetchState)]): ResultWithPartitions[Map[TopicPartition, Int]] = {
    val partitionEpochOpts = allPartitions
      .filter { case (_, state) => state.isTruncatingLog }
      .map { case (tp, _) => tp -> epochCacheOpt(tp) }.toMap

    val (partitionsWithEpoch, partitionsWithoutEpoch) = partitionEpochOpts.partition { case (_, epochCacheOpt) => epochCacheOpt.nonEmpty }

    debug(s"Build leaderEpoch request $partitionsWithEpoch")
    val result = partitionsWithEpoch.map { case (tp, epochCacheOpt) => tp -> epochCacheOpt.get.latestEpoch() }
    ResultWithPartitions(result, partitionsWithoutEpoch.keys.toSet)
  }

  override def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset] = {
    var result: Map[TopicPartition, EpochEndOffset] = null
    if (shouldSendLeaderEpochRequest) {
      val partitionsAsJava = partitions.map { case (tp, epoch) => tp -> epoch.asInstanceOf[Integer] }.toMap.asJava
      val epochRequest = new OffsetsForLeaderEpochRequest.Builder(offsetForLeaderEpochRequestVersion, partitionsAsJava)
      try {
        val response = leaderEndpoint.sendRequest(epochRequest)
        result = response.responseBody.asInstanceOf[OffsetsForLeaderEpochResponse].responses.asScala
        debug(s"Receive leaderEpoch response $result")
      } catch {
        case t: Throwable =>
          warn(s"Error when sending leader epoch request for $partitions", t)

          // if we get any unexpected exception, mark all partitions with an error
          result = partitions.map { case (tp, _) =>
            tp -> new EpochEndOffset(Errors.forException(t), UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
          }
      }
    } else {
      // just generate a response with no error but UNDEFINED_OFFSET so that we can fall back to truncating using
      // high watermark in maybeTruncate()
      result = partitions.map { case (tp, _) =>
        tp -> new EpochEndOffset(Errors.NONE, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
      }
    }
    result
  }

  /**
   *  To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list,
   *  the quota is exceeded and the replica is not in sync.
   */
  private def shouldFollowerThrottle(quota: ReplicaQuota, topicPartition: TopicPartition): Boolean = {
    val isReplicaInSync = fetcherLagStats.isReplicaInSync(topicPartition.topic, topicPartition.partition)
    quota.isThrottled(topicPartition) && quota.isQuotaExceeded && !isReplicaInSync
  }
}

object ReplicaFetcherThread {

  private[server] class FetchRequest(val sessionParts: util.Map[TopicPartition, JFetchRequest.PartitionData],
                                     val underlying: JFetchRequest.Builder)
      extends AbstractFetcherThread.FetchRequest {
    def offset(topicPartition: TopicPartition): Long =
      sessionParts.get(topicPartition).fetchOffset
    override def isEmpty = sessionParts.isEmpty && underlying.toForget().isEmpty
    override def toString = underlying.toString
  }

  private[server] class PartitionData(val underlying: FetchResponse.PartitionData[Records]) extends AbstractFetcherThread.PartitionData {

    def error = underlying.error

    def toRecords: MemoryRecords = {
      underlying.records.asInstanceOf[MemoryRecords]
    }

    def highWatermark: Long = underlying.highWatermark

    def logStartOffset: Long = underlying.logStartOffset

    def exception: Option[Throwable] = error match {
      case Errors.NONE => None
      case e => Some(e.exception)
    }

    override def toString = underlying.toString
  }
}
